import asyncio
from logging.config import fileConfig

import alembic_postgresql_enum  # noqa: F401
from alembic import context
from sqlalchemy import Connection, MetaData, text

from dstack._internal.server.db import get_db
from dstack._internal.server.models import BaseModel, EnumAsString

config = context.config

if config.config_file_name is not None and config.attributes.get("configure_logging", True):
    fileConfig(config.config_file_name)

target_metadata = BaseModel.metadata


def set_target_metadata(metadata: MetaData):
    global target_metadata
    target_metadata = metadata


def render_item(type_, obj, autogen_context):
    """Apply custom rendering for selected items."""
    if type_ == "type" and isinstance(obj, EnumAsString):
        return f"sa.String(length={obj.length})"
    # default rendering for other objects
    return False


def run_migrations_offline():
    """Run migrations in 'offline' mode.
    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.
    Calls to context.execute() here emit the given string to the
    script output.
    """
    context.configure(
        url=get_db().url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        render_item=render_item,
    )
    with context.begin_transaction():
        context.run_migrations()


# Programmatic API use (connection sharing) With Asyncio
# https://alembic.sqlalchemy.org/en/latest/cookbook.html#programmatic-api-use-connection-sharing-with-asyncio
def run_migrations_online():
    """Run migrations in 'online' mode.
    In this scenario we need to create an Engine
    and associate a connection with the context.
    """
    connection = config.attributes.get("connection", None)
    if connection is None:
        asyncio.run(run_async_migrations())
    else:
        run_migrations(connection)


def run_migrations(connection: Connection):
    # Temporarily disable foreign keys,
    # so that sqlite batch table migrations are performed without data loss:
    # https://alembic.sqlalchemy.org/en/latest/batch.html#dealing-with-referencing-foreign-keys
    if connection.dialect.name == "sqlite":
        connection.execute(text("PRAGMA foreign_keys=OFF;"))
    elif connection.dialect.name == "postgresql":
        # lock_timeout is needed so that migrations that acquire locks
        # do not wait for locks forever, blocking live queries.
        # Better to fail and retry a deployment.
        connection.execute(text("SET lock_timeout='10s';"))
    connection.commit()
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
        render_as_batch=True,
        render_item=render_item,
        # Running each migration in a separate transaction.
        # Running all migrations in one transaction may lead to deadlocks in HA deployments
        # because lock ordering is not respected across all migrations.
        transaction_per_migration=True,
    )
    with context.begin_transaction():
        context.run_migrations()
    if connection.dialect.name == "sqlite":
        connection.execute(text("PRAGMA foreign_keys=ON;"))
    connection.commit()


async def run_async_migrations():
    engine = get_db().engine
    async with engine.connect() as connection:
        await connection.run_sync(run_migrations)
    await engine.dispose()


def main():
    if context.is_offline_mode():
        run_migrations_offline()
    else:
        run_migrations_online()


# invoked via alembic command
if __name__ == "env_py":
    main()
